fix: make signature aggregation asynchronous#299
Conversation
🤖 Claude Code ReviewNow I have a full picture of the PR. Let me write up the review. PR #299 —
|
🤖 Codex Code ReviewFindings
No other obvious consensus or memory-safety regressions stood out in the diff. I couldn’t run Automated review by OpenAI Codex · gpt-5.4 · custom prompt |
Greptile SummaryThis PR moves committee-signature aggregation (XMSS proofs, 400–1200 ms each) off the
Confidence Score: 5/5Safe to merge; the one finding is a P2 metric/logging inconsistency with no consensus impact. All remaining findings are P2: the missing session-id fence in AggregationDone affects metric gauge timing and log clarity in a rare pathological case (prior worker outliving its 2 s join timeout), but does not corrupt store state or affect fork-choice correctness. The core architecture — snapshot isolation, session fencing on AggregateProduced, CancellationToken deadline, clean #[stopped] shutdown — is sound and validated by the 200-slot partition devnet run. crates/blockchain/src/lib.rs — AggregationDone handler (lines 723-741)
|
| Filename | Overview |
|---|---|
| crates/blockchain/src/lib.rs | Core actor refactor: moves XMSS aggregation off the actor thread via spawn_blocking, adds session lifecycle (AggregationSession, three new message types, #[stopped] hook). One P2: AggregationDone handler is missing the session-id fence that AggregateProduced correctly applies. |
| crates/blockchain/src/store.rs | Extracted pure snapshot/job/apply/finalize helpers for off-thread aggregation; removed inline aggregate_committee_signatures. on_tick interval-2 arm is now a no-op with a clear comment. Changes look correct. |
| crates/blockchain/src/metrics.rs | Added observe_committee_signatures_aggregation using a Duration argument (instead of a drop-guard) to support cross-thread measurement. All existing metrics preserved. |
| crates/blockchain/Cargo.toml | Adds tokio-util 0.7 (CancellationToken) with default-features = false. Only this crate uses tokio-util in the workspace; compiles cleanly per CI checks. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A["Interval 2 tick"] --> B["snapshot_aggregation_inputs()"]
B -->|"None: nothing to do"| Z["Skip session"]
B -->|"Some(snapshot)"| C["spawn_blocking worker"]
C --> D["send_after 750ms AggregationDeadline"]
C --> E["run_aggregation_worker loop"]
E --> F{"cancel.is_cancelled?"}
F -->|"Yes"| G["Break loop"]
F -->|"No"| H["aggregate_job() XMSS proof"]
H --> I["Send AggregateProduced to actor"]
I --> J["apply_aggregated_group + publish P2P"]
J --> E
G --> K["Send AggregationDone to actor"]
H --> K
D --> L["AggregationDeadline fires"]
L --> M["cancel.cancel()"]
K --> N["finalize_aggregation_session gauge refresh"]
O["#stopped hook"] --> P["cancel.cancel() + join worker 2s timeout"]
Prompt To Fix All With AI
This is a comment left during a code review.
Path: crates/blockchain/src/lib.rs
Line: 723-741
Comment:
**`AggregationDone` missing session-id fence**
`AggregateProduced` guards against stale sessions (`current != Some(msg.session_id)` → drop), but `AggregationDone` has no such guard. In the documented pathological case — a prior worker that outlives its 2 s join timeout — it can arrive while session N+1 is actively aggregating. This would (a) call `finalize_aggregation_session` mid-session N+1, snapshotting a partially-applied gossip-signatures gauge, and (b) emit a "Committee signatures aggregated" log line attributed to the old session id, making the devnet trace misleading.
```suggestion
impl Handler<AggregationDone> for BlockChainServer {
async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
let current = self.current_aggregation.as_ref().map(|s| s.session_id);
if current != Some(msg.session_id) {
trace!(
incoming_session_id = msg.session_id,
current_session_id = ?current,
"Dropping stale aggregation done for non-current session"
);
return;
}
store::finalize_aggregation_session(&self.store);
metrics::observe_committee_signatures_aggregation(msg.total_elapsed);
let aggregation_elapsed = msg.total_elapsed;
info!(
?aggregation_elapsed,
session_id = msg.session_id,
groups_considered = msg.groups_considered,
groups_aggregated = msg.groups_aggregated,
total_raw_sigs = msg.total_raw_sigs,
total_children = msg.total_children,
cancelled = msg.cancelled,
aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64,
"Committee signatures aggregated"
);
}
}
```
How can I resolve this? If you propose a fix, please make it concise.Reviews (1): Last reviewed commit: "refactor: simplify" | Re-trigger Greptile
| impl Handler<AggregationDone> for BlockChainServer { | ||
| async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) { | ||
| store::finalize_aggregation_session(&self.store); | ||
| metrics::observe_committee_signatures_aggregation(msg.total_elapsed); | ||
|
|
||
| let aggregation_elapsed = msg.total_elapsed; | ||
| info!( | ||
| ?aggregation_elapsed, | ||
| session_id = msg.session_id, | ||
| groups_considered = msg.groups_considered, | ||
| groups_aggregated = msg.groups_aggregated, | ||
| total_raw_sigs = msg.total_raw_sigs, | ||
| total_children = msg.total_children, | ||
| cancelled = msg.cancelled, | ||
| aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64, | ||
| "Committee signatures aggregated" | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
AggregationDone missing session-id fence
AggregateProduced guards against stale sessions (current != Some(msg.session_id) → drop), but AggregationDone has no such guard. In the documented pathological case — a prior worker that outlives its 2 s join timeout — it can arrive while session N+1 is actively aggregating. This would (a) call finalize_aggregation_session mid-session N+1, snapshotting a partially-applied gossip-signatures gauge, and (b) emit a "Committee signatures aggregated" log line attributed to the old session id, making the devnet trace misleading.
| impl Handler<AggregationDone> for BlockChainServer { | |
| async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) { | |
| store::finalize_aggregation_session(&self.store); | |
| metrics::observe_committee_signatures_aggregation(msg.total_elapsed); | |
| let aggregation_elapsed = msg.total_elapsed; | |
| info!( | |
| ?aggregation_elapsed, | |
| session_id = msg.session_id, | |
| groups_considered = msg.groups_considered, | |
| groups_aggregated = msg.groups_aggregated, | |
| total_raw_sigs = msg.total_raw_sigs, | |
| total_children = msg.total_children, | |
| cancelled = msg.cancelled, | |
| aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64, | |
| "Committee signatures aggregated" | |
| ); | |
| } | |
| } | |
| impl Handler<AggregationDone> for BlockChainServer { | |
| async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) { | |
| let current = self.current_aggregation.as_ref().map(|s| s.session_id); | |
| if current != Some(msg.session_id) { | |
| trace!( | |
| incoming_session_id = msg.session_id, | |
| current_session_id = ?current, | |
| "Dropping stale aggregation done for non-current session" | |
| ); | |
| return; | |
| } | |
| store::finalize_aggregation_session(&self.store); | |
| metrics::observe_committee_signatures_aggregation(msg.total_elapsed); | |
| let aggregation_elapsed = msg.total_elapsed; | |
| info!( | |
| ?aggregation_elapsed, | |
| session_id = msg.session_id, | |
| groups_considered = msg.groups_considered, | |
| groups_aggregated = msg.groups_aggregated, | |
| total_raw_sigs = msg.total_raw_sigs, | |
| total_children = msg.total_children, | |
| cancelled = msg.cancelled, | |
| aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64, | |
| "Committee signatures aggregated" | |
| ); | |
| } | |
| } |
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/blockchain/src/lib.rs
Line: 723-741
Comment:
**`AggregationDone` missing session-id fence**
`AggregateProduced` guards against stale sessions (`current != Some(msg.session_id)` → drop), but `AggregationDone` has no such guard. In the documented pathological case — a prior worker that outlives its 2 s join timeout — it can arrive while session N+1 is actively aggregating. This would (a) call `finalize_aggregation_session` mid-session N+1, snapshotting a partially-applied gossip-signatures gauge, and (b) emit a "Committee signatures aggregated" log line attributed to the old session id, making the devnet trace misleading.
```suggestion
impl Handler<AggregationDone> for BlockChainServer {
async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
let current = self.current_aggregation.as_ref().map(|s| s.session_id);
if current != Some(msg.session_id) {
trace!(
incoming_session_id = msg.session_id,
current_session_id = ?current,
"Dropping stale aggregation done for non-current session"
);
return;
}
store::finalize_aggregation_session(&self.store);
metrics::observe_committee_signatures_aggregation(msg.total_elapsed);
let aggregation_elapsed = msg.total_elapsed;
info!(
?aggregation_elapsed,
session_id = msg.session_id,
groups_considered = msg.groups_considered,
groups_aggregated = msg.groups_aggregated,
total_raw_sigs = msg.total_raw_sigs,
total_children = msg.total_children,
cancelled = msg.cancelled,
aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64,
"Committee signatures aggregated"
);
}
}
```
How can I resolve this? If you propose a fix, please make it concise.
Summary
BlockChainServeractor thread onto atokio::task::spawn_blockingworker. The actor no longer blocks for the duration of the XMSS proofs (previously 400-1200 ms per slot).CancellationTokenfired by a 750 ms deadline (self-message viasend_after). Aggregates produced inside the window are streamed back viaAggregateProducedactor messages and published immediately; post-deadline aggregates still get applied locally and published.#[stopped]lifecycle hook cancels the in-flight worker and joins it (bounded at 2 s) so node shutdown is clean; new-session start joins any straggler from the previous slot and warns if it hadn't finished yet.Architecture
crates/blockchain/src/store.rsnow exposes pure helpers:snapshot_aggregation_inputs,aggregate_job,apply_aggregated_group,finalize_aggregation_session. The oldaggregate_committee_signatures/try_aggregate(single inline function) are gone.store::on_tickno longer calls into aggregation — the actor drives it.pub(crate)):AggregateProduced,AggregationDone,AggregationDeadline. Late messages are fenced by a session id (the slot number).new_payloads, delete consumed gossip sigs, publish via gossipsub. End-of-session work (gauge refresh) is batched inAggregationDoneto avoid 2N lock acquisitions per slot.Observations from devnet runs
Under a 4-node devnet with three induced partitions (via
docker pause) across 200 slots:groups_considered > 1Prior aggregation worker still runningwarnings — worker wind-down is cleanTest plan
cargo check --workspacemake fmt,make lint(clippy clean)cargo test -p ethlambda-blockchain --test forkchoice_spectests— 77/77 pass